package com.fary.chapter04_executors.ext;

import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 线程池
 */
public class FaryThreadPoolExecutor implements FaryExecutorService {

    private static final FaryRejectedExecutionHandler defaultHandler = new FaryThreadPoolExecutor.FaryAbortPolicy();
    private final ReentrantLock mainLock = new ReentrantLock();
    private final Condition termination = mainLock.newCondition();

    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private final BlockingQueue<Runnable> workQueue;
    private volatile long keepAliveTime;
    private volatile FaryThreadFactory threadFactory;
    private volatile FaryRejectedExecutionHandler handler;

    private final HashSet<FaryThreadPoolExecutor.FaryWorker> workers = new HashSet<>();
    private int largestPoolSize;
    private volatile boolean allowCoreThreadTimeOut;
    private long completedTaskCount;
    private static final boolean ONLY_ONE = true;


    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }

    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    private void decrementWorkerCount() {
        ctl.addAndGet(-1);
    }
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }


    public FaryThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, FaryExecutors.defaultThreadFactory(), defaultHandler);
    }

    public FaryThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  FaryThreadFactory threadFactory,
                                  FaryRejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    @Override
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 如果当前线程数小于核心线程数大小执行addWorker()方法，增加一个线程执行
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 成功执行addWorker()就返回
            if (addWorker(command, true))
                return;
            // 没有成功执行获取最新的当前线程数
            c = ctl.get();
        }
        // 如果是运行状态，并且加入等待队列成功执行if块（额外含义：线程池是运行状态已经达到核心线程数，优先放入队列）
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次判断如果线程池不是运行态了并且移除本次提交任务成功，执行拒绝操作
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果是运行状态，或者线程不是运行态但是移除任务队列失败，
            //则检查是否有工作线程在消费队列，如果有则什么都不做（可以确保刚提交进队列的任务被完成），
            //如果没有需要建立一个消费线程用来消费刚刚提交的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果不是运行态或者加入队列失败那么尝试执行提交过来的任务，如果执行失败，走拒绝操作（额外含义：核心线程数满了，队列也满了，尝试建立新的线程消费，新线程数要小于最大线程数）
        else if (!addWorker(command, false))
            reject(command);
    }

    /**
     * 1、addWorker(Runnable,true)小于核心线程数使用
     * 2、addWorker(Runnable,false)大于核心线程数，并且等待队列也满了情况使用
     * 3、addWorker(null,true)没有任务创建一个线程等待任务到来使用（小于核心线程数的情况）
     * 4、addWorker(null,false)没有任务创建一个线程等待任务到来使用（小于最大线程数的情况）
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // 线程池状态RUNNING= -1；SHUTDOWN=0；STOP=1；TIDYING=2；TERMINATED=3
            // 如果线程池状态是SHUTDOWN及以后的任意一种状态，说明调用了关闭线程池的方法
            // 并且[线程池状态是STOP及以上的状态，或者传进来的任务不是空，或者工作队列等于空]
            // 这个判断条件是为了处理上个方法代码2处的情况，
            // 即线程池已经不是运行态（仅仅调用了shutdown方法），并且弹出队列失败，
            // 这种情况需要保证提交上来的任务得到执行，因此传过来一个null的任务，
            // 目的是为了让线程池启动一个线程执行刚提交的任务，
            //（隐含shutdown状态添加到队列中的任务（移除失败的）还是会被执行），
            // 如果已经不只是SHUTDOWN证明掉用过shutdownnow方法，直接返回false，
            // 或者仅调用shutdown后又来的新任务也返回false拒绝执行，
            // 或者是刚添加到队列的任务已经被其他线程消费过了，也返回false
            if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
                return false;

            for (;;) {
                // 检查工作线程数，如果大于线程池最大上限CAPACITY（即使用int低29位可以容纳的最大值）
                // 或者跟边界值比较已经到达边界值都返回false
                if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // 如果增加工作数成功跳出循环往下执行
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                // 如果增加工作线程数失败(可能调用了shutdown方法)，
                // 如果两次状态不一致则跳转到retry处重新尝试执行
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // 都没发生循环执行
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        FaryThreadPoolExecutor.FaryWorker w = null;
        try {
            // 把传进来的任务包装成worker对象
            w = new FaryThreadPoolExecutor.FaryWorker(firstTask);
            // 实际上t就是worker对象，只不过有名字等相关信息
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 再次检查线程池状态
                    int c = ctl.get();
                    // 如果是运行态直接执行，或如果是shutdown状态但传进来是个null，即前边说的移除队列失败情况
                    if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 检查这个对象是否被其他线程执行过
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 加入到workers中
                        workers.add(w);
                        int s = workers.size();
                        // 如果大于曾经执行过的最大线程数则最大线程数加1
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果增加成功启动新线程执行
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果启动失败从workers中移除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

    private void addWorkerFailed(FaryThreadPoolExecutor.FaryWorker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    final void runWorker(FaryThreadPoolExecutor.FaryWorker w) {
        // 当前线程
        Thread wt = Thread.currentThread();
        // 提交上来的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 调用Worker类的tryRelease()方法，将state置为0，
        w.unlock();
        // 而interruptIfStarted()中只有state>=0才允许调用中断
        boolean completedAbruptly = true;
        try {
            // 先执行提交上来的任务，完成后循环从队列中取任务执行
            while (task != null || (task = getTask()) != null) {
                // 加锁保证调用中断后运行的任务可以正常完成
                w.lock();
                // 执行新任务前要做以下判断
                // 1.如果线程池状态是大于等于stop(调用shutdownnow方法了)，
                // 直接查看当前线程符合未设置中断位 则直接调用wt.interrupt()方法设置
                // 2.如果线程池不是大于等于stop状态，则调用Thread.interrupted()清除interrupt位，
                // 这时如果程池为大于stop状态（有其他线程调用线程池的stopnow方法），
                // 再查看当前线程符合未设置中断位，如果没设置调用wt.interrupt()方法设置
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    // 线程池是运行态不会走到这
                    // 尝试终止正在执行的任务，这里仅仅设置一个标志位
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        // 直接调用run方法，在当前线程中执行
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            // 线程池状态RUNNING= -1；SHUTDOWN=0；STOP=1；TIDYING=2；TERMINATED=3
            // 如果线程池大于等于SHUTDOWN（调用过shutdown方法），
            // 判断是否是stop（调用shutdownnow）之后的状态或者等待队列已经为空
            // 言外之意调用过shutdownnow将停止执行等待队列中的任务，
            // 还有只掉用过shutdown方法会保证工作队列中的任务会被执行完
            if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                // 已经调用shutdown或者等待队列中的任务已经执行完，如果调用shutdownnow队列中的任务还没执行完那就放弃执行
                // 减少工作线程数
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // 工作线程数大于核心线程数或者核心线程超时时间为真（默认为false）
            // allowCoreThreadTimeOut为true超时会关闭线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 1.工作线程数大于最大线程数或【超时关闭标志位真且真的超时了】
            // 2.上个条件成立（言外之意工作线程数大于最大线程数或者已经查过空闲时间没任务，此时可能需要关闭一个线程了），
            // 并且确实有线程在工作（有工作线程才需要关闭），或者任务队列没工作任务了（没任务了对应的是超时那种情况）
            // 3.可能情况：1.wc > maximumPoolSize成立，wc > 1成立：大于核心线程数，有线程在运行，关闭一个线程
            //            2.wc > maximumPoolSize成立，workQueue.isEmpty() 成立：大于核心线程数，队列中已经没有任务可执行，关闭一个线程
            //            3.(timed && timedOut)成立，wc > 1 成立：线程空闲超时，有线程在运行，关闭一个线程
            //            4.(timed && timedOut)成立，workQueue.isEmpty()成立：线程空闲超时，队列中没有可执行的任务
            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                // 工作数量减1并返回null 返回null上层方法就会结束当前线程
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 如果上述情况不满足则正常取任务执行
                // 没有任务会挂起指定时间（言外之意已经大于核心数或者有超时时间的不能永久的阻塞下去）
                // 没有任务会阻塞直到有任务来
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    private void processWorkerExit(FaryThreadPoolExecutor.FaryWorker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (FaryWorker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    protected void terminated() { }

    public FaryThreadFactory getThreadFactory() {
        return threadFactory;
    }

    private final class FaryWorker extends AbstractQueuedSynchronizer implements Runnable {

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        FaryWorker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            // 指向提交过来的任务
            this.firstTask = firstTask;
            // 指向自己
            this.thread = getThreadFactory().newThread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }

        public void lock() { acquire(1); }
        public void unlock() { release(1); }
        public boolean tryLock()  { return tryAcquire(1); }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }

    public static class FaryAbortPolicy implements FaryRejectedExecutionHandler {

        public FaryAbortPolicy() { }

        public void rejectedExecution(Runnable r, FaryThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }
}
